package cn.doitedu.rtdw.data_etl;

import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * @Author: deep as the sea
 * @Site: <a href="www.51doit.com">多易教育</a>
 * @QQ: 657270652
 * @Date: 2023/2/8
 * @Desc: 学大数据，到多易教育
 *
 *  广告点击率预估  实时特征工程 , etl任务
 **/
public class E09_EtlJob_AdFeatureEtl {
    public static void main(String[] args) {
        // 创建编程环境
        // 创建编程入口环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setCheckpointStorage("file:/d:/checkpoint");
        env.setParallelism(1);

        StreamTableEnvironment tenv = StreamTableEnvironment.create(env);


        // 创建逻辑映射表，读取kafka中的行为日志明细宽表数据
        tenv.executeSql(
                " CREATE TABLE mall_events_commondim_kfksource(          "
                        + "     user_id           INT,                            "
                        + "     username          string,                         "
                        + "     event_time        bigint,                         "
                        + "     event_id          string,                         "
                        + "     device_type       string,                         "
                        + "     properties        map<string,string>,             "
                        + "     register_birthday string,                         "
                        + "     gps_province   STRING,                           "
                        + "     pt AS proctime()  ,                              "
                        + "     rt as to_timestamp_ltz(event_time,3) ,           "
                        + "     watermark for  rt as rt - interval '0' seconds   "  // 水位线watermark声明，因为后续需要进行时间窗口统计
                        + " ) WITH (                                             "
                        + "  'connector' = 'kafka',                              "
                        + "  'topic' = 'mall-events-wide',                       "
                        + "  'properties.bootstrap.servers' = 'doitedu:9092',    "
                        + "  'properties.group.id' = 'testGroup',                "
                        + "  'scan.startup.mode' = 'latest-offset',            "
                        + "  'value.format'='json',                              "
                        + "  'value.json.fail-on-missing-field'='false',         "
                        + "  'value.fields-include' = 'EXCEPT_KEY')              ");

        // 将properties中的字段压平
        tenv.executeSql(
                " CREATE TEMPORARY VIEW tmp AS              "
                +" SELECT                                            "
                +"   user_id,                                        "
                +"   event_id,                                       "
                +"   event_time,                                     "
                +"   pt,                                             "
                +"   rt,                                             "
                +"   properties['creative_id'] as creative_id,       "
                +"   properties['ad_tracking_id'] as ad_tracking_id  "
                +" FROM  mall_events_commondim_kfksource             "
                +" WHERE event_id in ('ad_show','ad_click')          ");


        // 写cep的计算sql，实现流内事件关联
        tenv.executeSql(
                " CREATE TEMPORARY VIEW show_click AS   "
                        +" SELECT                                "
                        +"   a_pt as pt,                         "
                        +"   a_user_id as user_id,               "
                        +"   show_time,                          "
                        +"   click_time,                         "
                        +"   a_creative_id as creative_id,       "
                        +"   a_ad_tracking_id as ad_tracking_id  "
                        +" FROM tmp                              "
                        +" MATCH_RECOGNIZE(                         "
                        +"   PARTITION BY user_id,ad_tracking_id    "
                        +"   ORDER BY rt                            "  // order by 必须跟一个 事件时间语义字段或处理时间语义字段
                        +"   MEASURES                               "
                        +"     A.pt as a_pt,                        "  // measures中的字段必须定义别名
                        +"     A.user_id as a_user_id,              "  // measures中的字段必须定义别名
                        +" 	   A.event_time as show_time,           "
                        +" 	   B.event_time as click_time,          "
                        +" 	   A.creative_id as a_creative_id,      "
                        +" 	   A.ad_tracking_id as a_ad_tracking_id  "
                        +"   ONE ROW PER MATCH                   "
                        +"   AFTER MATCH SKIP TO NEXT ROW        "
                        +"   PATTERN(A B)                        "
                        +"   DEFINE                              "
                        +"     A AS A.event_id = 'ad_show',      "
                        +" 	   B AS B.event_id = 'ad_click'      "
                        +" )                                     "
        );

        // 创建一个逻辑表， 映射 hbase 中的广告请求特征数据
        tenv.executeSql("CREATE TABLE ad_request_hbase( " +
                " ad_tracking_id STRING, " +
                " f ROW<requestTime bigint,platform STRING, pageId STRING, " +
                "   locationType STRING, mediaType STRING, returnAdId STRING, creativeId STRING, modelType STRING, " +
                "   client STRING, userFeatures STRING,adFeatures STRING>, " +
                " PRIMARY KEY (ad_tracking_id) NOT ENFORCED " +
                ") WITH (                             " +
                " 'connector' = 'hbase-2.2',          " +
                " 'table-name' = 'ad_request_log',     " +
                " 'zookeeper.quorum' = 'doitedu:2181' " +
                ")");

        // 关联请求特征流 和 tracking 流
        tenv.executeSql(
                " CREATE TEMPORARY VIEW res AS                        "
                        +" SELECT                                               "
                        +" sc.user_id                                           "
                        +" ,sc.show_time                                        "
                        +" ,sc.click_time                                       "
                        +" ,sc.creative_id                                      "
                        +" ,sc.ad_tracking_id             	                    "
                        +" ,rq.requestTime                                      "
                        +" ,rq.platform                                         "
                        +" ,rq.pageId                                           "
                        +" ,rq.locationType                                     "
                        +" ,rq.mediaType                                        "
                        +" ,rq.returnAdId                                       "
                        +" ,rq.creativeId                                       "
                        +" ,rq.modelType                                        "
                        +" ,rq.client                                           "
                        +" ,rq.userFeatures                                     "
                        +" ,rq.adFeatures                                       "
                        +" FROM show_click sc                                   "
                        +" JOIN ad_request_hbase FOR SYSTEM_TIME AS OF sc.pt AS rq "
                        +" ON sc.ad_tracking_id = rq.ad_tracking_id             "
        );

        // 创建一个kafka的sink逻辑表
        tenv.executeSql(
                " CREATE TABLE ad_click_pre_sink(          "
                        +"  user_id   INT                                 "
                        +" ,show_time BIGINT                                       "
                        +" ,click_time BIGINT                                     "
                        +" ,creative_id STRING                                     "
                        +" ,ad_tracking_id  STRING           	                    "
                        +" ,requestTime  BIGINT                                    "
                        +" ,platform STRING                                        "
                        +" ,pageId STRING                                            "
                        +" ,locationType STRING                                      "
                        +" ,mediaType STRING                                         "
                        +" ,returnAdId STRING                                        "
                        +" ,creativeId STRING                                        "
                        +" ,modelType STRING                                         "
                        +" ,client STRING                                            "
                        +" ,userFeatures STRING                                      "
                        +" ,adFeatures STRING                                        "
                        + " ) WITH (                                             "
                        + "  'connector' = 'kafka',                              "
                        + "  'topic' = 'adclick-forecast-features',            "
                        + "  'properties.bootstrap.servers' = 'doitedu:9092',    "
                        + "  'properties.group.id' = 'testGroup',                "
                        + "  'scan.startup.mode' = 'latest-offset',            "
                        + "  'value.format'='json',                              "
                        + "  'value.json.fail-on-missing-field'='false',         "
                        + "  'value.fields-include' = 'EXCEPT_KEY')              ");

        tenv.executeSql("insert into ad_click_pre_sink select * from res");


    }
}
